package com.atguigu.day01;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.*;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class Flink01_Batch_WordCount {
    public static void main(String[] args) throws Exception {
        //1.创建批处理环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //2.读取文件中的数据
        DataSource<String> dataSource = env.readTextFile("input/word.txt");

        /**
         * 先用FlatMap对一行数据按照空做切分，切出每一个单词->组成Tuple2元组（word，1）
         * 调用reduceByKey（1.先将相同单词的数据聚和到一块2.根据传入的逻辑对Value做累加操作）
         * 最后打印到控制台
         */

        //3.先用FlatMap对一行数据按照空格做切分，切出每一个单词->组成Tuple2元组（word，1）
//        FlatMapOperator<String, Tuple2<String, Integer>> wordToOne = dataSource.flatMap(new MyFlatMap());

        //3.先用FlatMap对一行数据按照空格做切分，切出每一个单词
        //匿名实现类
        FlatMapOperator<String, String> word = dataSource.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(word);
                }
            }
        });

        //组成Tuple2元组（word，1）
        //Lambda表达式
        MapOperator<String, Tuple2<String, Integer>> wordToOne = word.map(r->Tuple2.of(r,1)).returns(Types.TUPLE(Types.STRING,Types.INT));

        //4.先将相同单词的数据聚和到一块
        UnsortedGrouping<Tuple2<String, Integer>> groupBy = wordToOne.groupBy(0);

        //5.对Value做累加操作
        AggregateOperator<Tuple2<String, Integer>> result = groupBy.sum(1);

        //6.打印到控制台
        result.print();
    }

    //自定义一个类实现接口
    public static class MyFlatMap implements FlatMapFunction<String, Tuple2<String,Integer>>{

        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
            //按照空格切分
            String[] words = value.split(" ");
            //获取到切分出来的每一个单词
            for (String word : words) {
                //使用Collector将数据发送至下游
//                out.collect(new Tuple2<>(word, 1));
                out.collect(Tuple2.of(word,1));
            }
        }
    }
}
